A skewed dataset
is defined by a dataset that has a class imbalance, this leads to poor or failing spark jobs that often get a OOM
(out of memory) error.
When performing a join
onto a skewed dataset
it's usually the case where there is an imbalance on the key
(s) on which the join is performed on. This results in a majority of the data falls onto a single partition, which will take longer to complete than the other partitions.
Some hints to detect skewness is:
- The
key
(s) consist mainly ofnull
values which fall onto a single partition. - There is a subset of values for the
key
(s) that makeup the high percentage of the total keys which fall onto a single partition.
We go through both these cases and see how we can combat it.
Library Imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
Template
spark = (
SparkSession.builder
.master("local")
.appName("Exploring Joins")
.config("spark.some.config.option", "some-value")
.getOrCreate()
)
sc = spark.sparkContext
Situation 2: High Frequency Keys
Inital Datasets
customers = spark.createDataFrame([
(1, "John"),
(2, "Bob"),
], ["customer_id", "first_name"])
customers.toPandas()
customer_id | first_name | |
---|---|---|
0 | 1 | John |
1 | 2 | Bob |
orders = spark.createDataFrame([
(i, 1 if i < 95 else 2, "order #{}".format(i)) for i in range(100)
], ["id", "customer_id", "order_name"])
orders.toPandas().tail(6)
id | customer_id | order_name | |
---|---|---|---|
94 | 94 | 1 | order #94 |
95 | 95 | 2 | order #95 |
96 | 96 | 2 | order #96 |
97 | 97 | 2 | order #97 |
98 | 98 | 2 | order #98 |
99 | 99 | 2 | order #99 |
Option 1: Inner Join
df = customers.join(orders, "customer_id")
df.toPandas().tail(10)
customer_id | first_name | id | order_name | |
---|---|---|---|---|
90 | 1 | John | 90 | order #90 |
91 | 1 | John | 91 | order #91 |
92 | 1 | John | 92 | order #92 |
93 | 1 | John | 93 | order #93 |
94 | 1 | John | 94 | order #94 |
95 | 2 | Bob | 95 | order #95 |
96 | 2 | Bob | 96 | order #96 |
97 | 2 | Bob | 97 | order #97 |
98 | 2 | Bob | 98 | order #98 |
99 | 2 | Bob | 99 | order #99 |
df.explain()
== Physical Plan ==
*(5) Project [customer_id#122L, first_name#123, id#126L, order_name#128]
+- *(5) SortMergeJoin [customer_id#122L], [customer_id#127L], Inner
:- *(2) Sort [customer_id#122L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(customer_id#122L, 200)
: +- *(1) Filter isnotnull(customer_id#122L)
: +- Scan ExistingRDD[customer_id#122L,first_name#123]
+- *(4) Sort [customer_id#127L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(customer_id#127L, 200)
+- *(3) Filter isnotnull(customer_id#127L)
+- Scan ExistingRDD[id#126L,customer_id#127L,order_name#128]
What Happened:
- We want to find what
order
s eachcustomer
made, so we will bejoin
ing thecustomer
s table to theorder
s table. - When performing the join, we perform a
hashpartitioning
oncustomer_id
. - From our data creation, this means 95% of the data landed onto a single partition.
Results:
- Similar to the
Null Skew
case, this means that single task/partition will take a lot longer than the others, and most likely erroring out.